package com.parkingwang.learning.subjecttype;

import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.*;

public class SubjectType {

    /**
     * Subject分4种类型  不支持背压    使用背压可以processor
     * AsyncSubject,BehaviorSubject,ReplaySubject,PublishSubject
     * @param args
     */
    public static void main(String[] args) {

//        asyncSubject();

//        behaviorSubject();

//        replaySubject();

        publishSubject();


    }

    /**
     * 只接收订阅后的数据
     * 如果在订阅前就调用了onComplete(),不会接收到任何数据了
     */
    private static void publishSubject() {
        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("1");
        subject.onNext("2");
        subject.onNext("3");
//        subject.onComplete();
        subscribeFun(subject);
        subject.onNext("4");
    }

    /**
     * 灵活性大
     * create()：订阅前后全部数据都接收
     * createWithSize(N):只接收订阅前最后N条数据和订阅后的数据
     * createWithTime():设置缓存时间。。。。Demo？？？
     */
    private static void replaySubject() {
//        ReplaySubject<String> subject = ReplaySubject.create();
        ReplaySubject<String> subject = ReplaySubject.createWithSize(2);
//        ReplaySubject.createWithTime();
        subject.onNext("1");
        subject.onNext("2");
        subject.onNext("3");
        subscribeFun(subject);
        subject.onNext("4");
    }

    /**
     * observer 会先接收到订阅前的最后一个数据，再接受订阅后的数据
     * 如果订阅前没有数据，会默认发送一条数据
     */
    private static void behaviorSubject() {
        BehaviorSubject<String> subject = BehaviorSubject.createDefault("0");
//        BehaviorSubject.create();
        subject.onNext("1");
        subscribeFun(subject);

        subject.onNext("2");
        subject.onNext("3");
    }

    /**
     * observer只接收onComplete()之前的最后一个数据
     * 调用onComplete()才开始发射数据，否则observer不接收任何数据
     */
    private static void asyncSubject() {
        AsyncSubject<String> subject = AsyncSubject.create();

        subject.onNext("1");
        subject.onNext("2");
//        subject.onComplete();

        subscribeFun(subject);
        subject.onNext("3");
        subject.onNext("4");
        subject.onComplete();

    }

    private static void subscribeFun(Subject<String> subject) {
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("next---"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println(throwable.toString());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("complete");
            }
        });
    }

}
